feat(grpc): implement continuous Watch streaming for health servicers#917
feat(grpc): implement continuous Watch streaming for health servicers#917V2arK wants to merge 11 commits intolightseekorg:mainfrom
Conversation
📝 WalkthroughWalkthroughAdds a reusable HealthWatchMixin implementing the gRPC Health Watch streaming RPC, refactors SGLang and vLLM health servicers to use the mixin (removing their in-class Watch implementations), bumps package version and test extras, and introduces async tests and test fixtures for watch behavior. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant WatchMixin as HealthWatchMixin
participant Servicer
participant Backend as Scheduler/LLM
participant ShutdownEvent as _watch_shutdown_event
Client->>WatchMixin: Watch(request)
WatchMixin->>Servicer: _resolve_watch_status(service)
Servicer->>Backend: compute health (sync/async)
Backend-->>Servicer: status (SERVING/NOT_SERVING/UNKNOWN)
Servicer-->>WatchMixin: status
WatchMixin-->>Client: yield HealthCheckResponse(status)
loop poll until shutdown or cancel
alt status changes
WatchMixin-->>Client: yield updated HealthCheckResponse
else no change
WatchMixin->>ShutdownEvent: wait(timeout=WATCH_POLL_INTERVAL_S)
end
alt shutdown notified or cancelled
WatchMixin-->>Client: (final status if needed) and end stream
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a shared HealthWatchMixin to implement the gRPC Health Checking Protocol's Watch RPC for both SGLang and vLLM inference engines. The mixin provides a continuous streaming response that updates clients on health status changes or server shutdown. The PR also includes a version bump to 0.6.0, the addition of test-specific dependencies in pyproject.toml, and a comprehensive suite of unit tests. Feedback was provided regarding the use of inspect.isawaitable() for more robust detection of asynchronous results in the mixin's status resolution logic.
| async def _resolve_watch_status(self, service_name: str) -> int: | ||
| """Call _compute_watch_status, handling both sync and async impls.""" | ||
| result = self._compute_watch_status(service_name) | ||
| if asyncio.iscoroutine(result): |
There was a problem hiding this comment.
For more robust detection of awaitable results from _compute_watch_status, it's better to use inspect.isawaitable() instead of asyncio.iscoroutine(). isawaitable() is more general and correctly handles not just coroutines from async def functions, but also other awaitable objects like asyncio.Future or custom objects with an __await__ method. This makes the mixin more resilient to different implementation patterns in subclasses.
You'll need to add import inspect at the top of the file.
| if asyncio.iscoroutine(result): | |
| if inspect.isawaitable(result): |
There was a problem hiding this comment.
Accepted. Switched to inspect.isawaitable() -- it's the idiomatic pattern for this and handles the sync/async dispatch more robustly.
Note: the broader rationale about asyncio.Future and custom __await__ objects doesn't apply here since _compute_watch_status only ever returns int (sync) or a coroutine (async def), but isawaitable is still the right call.
@coderabbitai review |
|
🧠 Learnings used✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 012e5f6e16
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| self._watch_shutdown_event.wait(), | ||
| timeout=self.WATCH_POLL_INTERVAL_S, | ||
| ) | ||
| except TimeoutError: |
There was a problem hiding this comment.
Catch asyncio.TimeoutError in Watch poll loop
Watch() currently catches built-in TimeoutError, but on Python 3.10 (which is supported via requires-python >=3.10) asyncio.wait_for() raises asyncio.TimeoutError instead. When a stream is healthy and no shutdown event occurs for one poll interval, that timeout escapes the loop and aborts the RPC rather than continuing to poll, so long-lived watch streams terminate unexpectedly in normal operation.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Good catch -- this is a real bug on Python 3.10.
On 3.10, asyncio.TimeoutError inherits from Exception directly, NOT from the builtin TimeoutError. So except TimeoutError: misses it entirely, causing the poll loop to crash on every interval when no shutdown event fires. On 3.11+, they unified the two (asyncio.TimeoutError is builtins.TimeoutError), which is why this wasn't caught in local testing.
Fixed: except asyncio.TimeoutError: -- works on both 3.10 and 3.11+ since it's the same class on 3.11+.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/smg_grpc_servicer/sglang/health_servicer.py`:
- Around line 149-176: Extract the hard-coded 30s timeout into a class-level
constant (e.g., SCHEDULER_RESPONSIVENESS_TIMEOUT_S) and replace the literal 30
in both _compute_watch_status and Check with
self.SCHEDULER_RESPONSIVENESS_TIMEOUT_S; add the constant to the class
definition, update the time_since comparison in _compute_watch_status and the
corresponding check in Check() to use that constant, and ensure any tests or
other methods referencing the 30s behavior use the new constant name.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 033c0e2b-a6dd-400c-b263-c6c6868ccb12
📒 Files selected for processing (8)
grpc_servicer/pyproject.tomlgrpc_servicer/smg_grpc_servicer/health_watch.pygrpc_servicer/smg_grpc_servicer/sglang/health_servicer.pygrpc_servicer/smg_grpc_servicer/vllm/health_servicer.pygrpc_servicer/tests/__init__.pygrpc_servicer/tests/conftest.pygrpc_servicer/tests/test_sglang_health_watch.pygrpc_servicer/tests/test_vllm_health_watch.py
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
TDD red phase: 7 tests for SGLangHealthServicer.Watch() continuous streaming. 5 fail against current single-yield implementation. Adds sglang MagicMock stubs to conftest to allow collection without a full SGLang installation. Signed-off-by: Honglin Zhu <honglin@nvidia.com> Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
TDD red phase: 7 tests for VllmHealthServicer.Watch() continuous streaming. 3 fail (exits_on_shutdown, engine_failure, no_duplicate) as expected; 4 pass against current single-yield stub. Also adds vllm module stubs to conftest so tests collect without vLLM installed. Signed-off-by: Honglin <honglin@nvidia.com> Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
_notify_shutdown() now also sets self._watch_notified_shutdown = True so subclasses can detect explicit shutdown (via set_not_serving()) in _is_shutting_down() independently of their engine-specific flags. Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
012e5f6 to
9adb736
Compare
- Use inspect.isawaitable() instead of asyncio.iscoroutine() for idiomatic sync/async dispatch in _resolve_watch_status() - Fix Python 3.10 bug: catch asyncio.TimeoutError instead of builtin TimeoutError in Watch poll loop (they diverge on <3.11) - Extract scheduler responsiveness timeout (30s) as class constant SCHEDULER_RESPONSIVENESS_TIMEOUT_S to eliminate magic number - Update test conftest stubs to match current import graph Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
|
All three review items addressed in a79ac91:
Also updated test conftest stubs to match current upstream import graph. 14/14 tests passing. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a79ac91799
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if self._is_shutting_down(): | ||
| return |
There was a problem hiding this comment.
Emit shutdown status before terminating Watch stream
This early return can drop a real status transition during shutdown: if set_not_serving() runs after status was computed for the current iteration but before this check executes, the stream exits immediately without sending the final NOT_SERVING update. In that race window, Watch clients only observe EOF and miss the health-state change event they rely on for routing decisions.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Evaluated -- pushing back on this one.
The race requires set_not_serving() to fire between _resolve_watch_status() returning and _is_shutting_down() executing. In asyncio's single-threaded cooperative model:
- SGLang:
_compute_watch_status()is sync -- no yield point, race is impossible. - vLLM: async via
await check_health(), but the window between the return of_resolve_watch_status()and the_is_shutting_down()check is a single Python statement with noawait-- no coroutine switch can happen there.
Even in the theoretical case where shutdown lands between _resolve_watch_status yielding control (during check_health()) and the shutdown check: the client would see SERVING then EOF. Any conformant Watch client treats EOF as "server unavailable" -- the gRPC transport-level disconnect is the primary shutdown signal, not an in-band NOT_SERVING message.
Not worth the added complexity.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
grpc_servicer/smg_grpc_servicer/vllm/health_servicer.py (1)
61-97: 🧹 Nitpick | 🔵 TrivialFactor status resolution into one helper before
Check()andWatch()drift apart.This file now has two copies of the same shutdown/service-dispatch/health-mapping logic. A shared helper returning just the status enum would keep the RPCs aligned, with
Check()remaining responsible forcontext.set_code()/context.set_details()on unknown services.Also applies to: 99-119
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@grpc_servicer/smg_grpc_servicer/vllm/health_servicer.py` around lines 61 - 97, Extract the duplicated shutdown/service-dispatch/health-mapping logic into a single async helper (e.g., async def _get_health_status(self, service_name) -> int) that returns a health_pb2.HealthCheckResponse status enum; the helper should check self._shutting_down, map OVERALL_SERVER/VLLM_SERVICE to awaiting self.async_llm.check_health() and return SERVING or NOT_SERVING accordingly, and return SERVICE_UNKNOWN for unknown services; update Check() and Watch() to call this helper for the status resolution while keeping Check() responsible for setting context.set_code()/set_details() when the helper returns SERVICE_UNKNOWN.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/tests/test_vllm_health_watch.py`:
- Around line 99-109: The test test_watch_handles_client_cancel is closing the
generator with gen.aclose(), which raises GeneratorExit rather than triggering
asyncio.CancelledError handled by HealthWatchMixin.Watch(); update the test to
create an asyncio Task that awaits gen.__anext__ (or awaits the generator
consumption) and then cancel that Task to simulate a real client disconnect so
the Watch() CancelledError branch is exercised; refer to servicer.Watch and
HealthWatchMixin.Watch() when making the change (apply the same pattern to the
SGLang sibling test).
---
Outside diff comments:
In `@grpc_servicer/smg_grpc_servicer/vllm/health_servicer.py`:
- Around line 61-97: Extract the duplicated
shutdown/service-dispatch/health-mapping logic into a single async helper (e.g.,
async def _get_health_status(self, service_name) -> int) that returns a
health_pb2.HealthCheckResponse status enum; the helper should check
self._shutting_down, map OVERALL_SERVER/VLLM_SERVICE to awaiting
self.async_llm.check_health() and return SERVING or NOT_SERVING accordingly, and
return SERVICE_UNKNOWN for unknown services; update Check() and Watch() to call
this helper for the status resolution while keeping Check() responsible for
setting context.set_code()/set_details() when the helper returns
SERVICE_UNKNOWN.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 471b728e-4802-483f-b2bd-86e99ab42825
📒 Files selected for processing (8)
grpc_servicer/pyproject.tomlgrpc_servicer/smg_grpc_servicer/health_watch.pygrpc_servicer/smg_grpc_servicer/sglang/health_servicer.pygrpc_servicer/smg_grpc_servicer/vllm/health_servicer.pygrpc_servicer/tests/__init__.pygrpc_servicer/tests/conftest.pygrpc_servicer/tests/test_sglang_health_watch.pygrpc_servicer/tests/test_vllm_health_watch.py
…ests Replace aclose()-based cancel tests with Task.cancel() to exercise the actual asyncio.CancelledError path inside Watch(), matching real gRPC client disconnect behavior. aclose() only triggers GeneratorExit. Signed-off-by: Honglin Cao <Caohonglin317@hotmail.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/tests/test_sglang_health_watch.py`:
- Around line 41-45: The test uses open-ended async for loops over
servicer.Watch which can hang if shutdown signaling breaks; replace these with a
bounded collector using asyncio.wait_for by creating a helper (e.g.,
_collect_watch) that runs the async iterator and accumulates response.status,
then call asyncio.wait_for(_collect_watch(servicer.Watch(request_msg,
grpc_context)), timeout=SHORT_TIMEOUT) in tests; update occurrences referencing
servicer.Watch and servicer.set_not_serving (lines around the current loops) to
use the helper so tests fail fast instead of hanging.
In `@grpc_servicer/tests/test_vllm_health_watch.py`:
- Around line 34-39: The streaming test uses servicer.Watch and can hang if the
stream never terminates; wrap the async collection in an explicit timeout using
asyncio.wait_for (or your existing async helper) so the test fails fast.
Specifically, change the block that iterates over async for response in
servicer.Watch(request_msg, grpc_context) (and similar blocks around
servicer.set_not_serving) to collect the responses inside an await
asyncio.wait_for(..., timeout=some_seconds) call (e.g., gather into a coroutine
that consumes one response then calls servicer.set_not_serving) so the test
raises on timeout instead of hanging.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 7c7ae82c-fc06-453d-b6fa-03bc7efdb6b7
📒 Files selected for processing (2)
grpc_servicer/tests/test_sglang_health_watch.pygrpc_servicer/tests/test_vllm_health_watch.py
| async for response in servicer.Watch(request_msg, grpc_context): | ||
| received.append(response.status) | ||
| if len(received) == 1: | ||
| servicer.set_not_serving() | ||
|
|
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Bound stream-consumption waits to prevent indefinite test hangs on regressions.
These async for loops are open-ended and rely on side effects to terminate. If a future change breaks shutdown signaling, this module can hang instead of failing fast. Wrap stream collection in asyncio.wait_for(...) (or a shared helper) with a small timeout.
⏱️ Suggested pattern
+async def _collect_watch(servicer, request_msg, grpc_context, timeout_s=1.0):
+ async def _run():
+ items = []
+ async for response in servicer.Watch(request_msg, grpc_context):
+ items.append(response.status)
+ return items
+ return await asyncio.wait_for(_run(), timeout=timeout_s)Then use _collect_watch(...) in each test instead of bare open-ended async for.
Also applies to: 63-66, 83-86, 120-123, 144-148, 172-175
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@grpc_servicer/tests/test_sglang_health_watch.py` around lines 41 - 45, The
test uses open-ended async for loops over servicer.Watch which can hang if
shutdown signaling breaks; replace these with a bounded collector using
asyncio.wait_for by creating a helper (e.g., _collect_watch) that runs the async
iterator and accumulates response.status, then call
asyncio.wait_for(_collect_watch(servicer.Watch(request_msg, grpc_context)),
timeout=SHORT_TIMEOUT) in tests; update occurrences referencing servicer.Watch
and servicer.set_not_serving (lines around the current loops) to use the helper
so tests fail fast instead of hanging.
| async for response in servicer.Watch(request_msg, grpc_context): | ||
| received.append(response.status) | ||
| if len(received) == 1: | ||
| servicer.set_not_serving() | ||
|
|
||
| assert received[0] == SERVING |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add explicit timeout bounds around stream collection to avoid stuck CI jobs.
These tests currently depend on Watch termination side effects; if termination regresses, they can hang. Use asyncio.wait_for (directly or via helper) so failures surface deterministically.
Also applies to: 68-71, 91-94, 126-129, 147-151, 162-166
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@grpc_servicer/tests/test_vllm_health_watch.py` around lines 34 - 39, The
streaming test uses servicer.Watch and can hang if the stream never terminates;
wrap the async collection in an explicit timeout using asyncio.wait_for (or your
existing async helper) so the test fails fast. Specifically, change the block
that iterates over async for response in servicer.Watch(request_msg,
grpc_context) (and similar blocks around servicer.set_not_serving) to collect
the responses inside an await asyncio.wait_for(..., timeout=some_seconds) call
(e.g., gather into a coroutine that consumes one response then calls
servicer.set_not_serving) so the test raises on timeout instead of hanging.
Description
Problem
SGLangHealthServicer.Watch()andVllmHealthServicer.Watch()yield a single response then close the stream. This violates the gRPC Health Checking Protocol, which requires Watch to be a long-lived server-streaming RPC that sends updates whenever the service's health status changes.Additionally,
SGLangHealthServicer.Watch()delegates toself.Check(), which callscontext.set_code(NOT_FOUND)andcontext.set_details()for unknown services, polluting the streaming response context.Follow-up from #885. Ref: vllm-project/vllm#38016.
Solution
Add
HealthWatchMixinproviding the Watch loop skeleton (poll +asyncio.Eventfor immediate shutdown wakeup, yield-on-change, cancel handling). Both servicers integrate the mixin and implement_compute_watch_status()and_is_shutting_down().await async_llm.check_health())The mixin's
_resolve_watch_status()bridge method auto-detects sync vs async implementations viaasyncio.iscoroutine(), so each servicer uses its natural calling convention.Spec deviation: for unknown services, the stream sends
SERVICE_UNKNOWNonce then exits (spec says keep open for dynamic registration, but smg services are statically defined).Test Plan
Unit tests: 14/14 passed (macOS + x86 Linux)
vLLM E2E Watch deferred -- requires vllm-project/vllm#38016 to register
grpc.health.v1.Healthin the gRPC server.Checklist
cargo +nightly fmtpasses (no Rust changes)cargo clippy --all-targets --all-features -- -D warningspasses (no Rust changes)Summary by CodeRabbit
Chores
New Features
Refactor
Tests